package spark.movie;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.Tuple3;
import scala.Tuple4;
import spark.secondsort.SecondSort;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * 描述:
 * 电影点评系统
 *
 * @author jiantao7
 * @create 2018-05-16 16:28
 */
public class FilmReviewSystemThroughRDD {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder().appName("FilmReviewSystem")
                .master("local[*]").getOrCreate();
        /**
         * 使用sparkSession获取java sparkContext需要使用该方式
         */
        JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

        sc.setLogLevel("warn");

        /**
         *user RDD
         *UserID:: Gender:: Age:: Occupation:: Zip-code
         */

        JavaRDD<String> userRDD = sc.textFile("E:\\data\\ml-1m\\users.dat", 1);
        /**
         * movies RDD
         * MovieID:: Title:: Genres
         */
        JavaRDD<String> moviesRDD = sc.textFile("E:\\data\\ml-1m\\movies.dat", 1);
        /**
         * ratings RDD
         * UserID:: MovieID:: Rating:: Timestamp
         */
        JavaRDD<String> ratingsRDD = sc.textFile("E:\\data\\ml-1m\\ratings.dat", 1);

        JavaPairRDD<String, String> moviesInfo = moviesRDD.mapToPair(new PairFunction<String, String, String>() {
            @Override
            public Tuple2<String, String> call(String lines) throws Exception {
                String[] split = lines.split("::");
                String movieId = split[0];
                String movieName = split[1];
                /**
                 *TODO 没做数据验证
                 */
                return new Tuple2<String, String>(movieId, movieName);
            }
        });

        JavaRDD<Tuple3<String, String, Double>> ratingsInfo = ratingsRDD.map(new Function<String, Tuple3<String, String, Double>>() {
            @Override
            public Tuple3<String, String, Double> call(String lines) throws Exception {
                String[] split = lines.split("::");
                String userId = split[0];
                String movieId = split[1];
                double ratings = Double.parseDouble(split[2]);
                return new Tuple3(userId, movieId, ratings);
            }
        });

        JavaPairRDD<String, Tuple4<String, Integer, String, String>> userInfo = userRDD.mapToPair(new PairFunction<String, String, Tuple4<String, Integer, String, String>>() {
            @Override
            public Tuple2<String, Tuple4<String, Integer, String, String>> call(String lines) throws Exception {
                String[] strings = lines.split("::");
                String userId = strings[0];
                String gender = strings[1];
                int age = Integer.parseInt(strings[2]);
                String occupation = strings[3];
                String zipCode = strings[4];

                Tuple4<String, Integer, String, String> value = new Tuple4<String, Integer, String, String>(gender, age, occupation, zipCode);
                return new Tuple2<String, Tuple4<String, Integer, String, String>>(userId, value);
            }
        });
//        getTop10AvgUserRatings(moviesInfo, ratingsInfo);

//        getTop10MostPopularMoviesByMaleAndFemale(moviesInfo, ratingsRDD, userInfo);
        getRatingsTop10SecondSort(moviesInfo,ratingsRDD);
    }

    /**
     * 实现 对 电影 评分 数据 进行 二次 排序， 以 Timestamp 和 Rating 两个 维 度 降序 排列，
     * @param movieIdNameInfo
     * @param ratingsRDD
     */
    private static void getRatingsTop10SecondSort(JavaPairRDD<String, String> movieIdNameInfo, JavaRDD<String> ratingsRDD) {
        /**
         * [movieId,Tuple2(ratings,timestamp)]
         */
        JavaPairRDD<String,Tuple2<Integer,Long>> movieIdRatingsTimestampInfo = ratingsRDD.mapToPair(
                new PairFunction<String, String, Tuple2<Integer, Long>>() {
                    @Override
                    public Tuple2<String, Tuple2<Integer, Long>> call(String lines) throws Exception {
                        String[] strings = lines.split("::");
                        String movieId = strings[1];
                        int ratings = Integer.parseInt(strings[2]);
                        long timestamp = Long.parseLong(strings[3]);
                        Tuple2<Integer, Long> value = new Tuple2<Integer, Long>(ratings,timestamp);
                        return new Tuple2<String, Tuple2<Integer, Long>>(movieId,value);
                    }
                }
        );

        /**
         * [movieId, Tuple2<movieName, Tuple2<ratings, timestamp>>]
         */
        JavaPairRDD<String, Tuple2<String, Tuple2<Integer, Long>>> movieIdNameRatingRDD = movieIdNameInfo.join(movieIdRatingsTimestampInfo);

        /**
         * 构建二次排序key
         */
        JavaPairRDD<SecondSort,String> secondSortKeyRDD = movieIdNameRatingRDD.mapToPair(
                new PairFunction<Tuple2<String, Tuple2<String, Tuple2<Integer, Long>>>, SecondSort, String>() {
                    @Override
                    public Tuple2<SecondSort, String> call(Tuple2<String, Tuple2<String, Tuple2<Integer, Long>>> tp2) throws Exception {
                        SecondSort secondSort = new SecondSort(tp2._2._2._2(),tp2._2._2._1());
                        String movieName = tp2._2._1;
                        return new Tuple2<SecondSort, String>(secondSort,movieName);
                    }
                }
        );

        JavaPairRDD<SecondSort, String> secondSortStringJavaPairRDD = secondSortKeyRDD.sortByKey(false);

        List<Tuple2<SecondSort, String>> tuple2List = secondSortStringJavaPairRDD.take(10);

        for (Tuple2<SecondSort, String> tp2 : tuple2List)
        {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSSSS");
            String timestamp = sdf.format(new Date(tp2._1.getTimestamp()));
            int ratings = tp2._1.getRatings();

            String movieName = tp2._2;

            System.out.println(timestamp+"     "+ratings+"      "+movieName);
        }
    }

    /**
     * 分析 最受 男性 喜爱 的 电影 Top10 和 最受 女性 喜爱 的 电影 Top10。以电影名 + 平均评分显示
     *
     * @param moviesInfo
     * @param ratingsRDD
     * @param userInfo
     */
    private static void getTop10MostPopularMoviesByMaleAndFemale(JavaPairRDD<String, String> moviesInfo,
                                                                 JavaRDD<String> ratingsRDD,
                                                                 JavaPairRDD<String, Tuple4<String, Integer, String, String>> userInfo) {
        /**
         * [movieId,Tuple2(userId,rating)]
         */
        JavaPairRDD<String, Tuple2<String, Double>> ratingsInfo = ratingsRDD.mapToPair(new PairFunction<String, String, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Tuple2<String, Double>> call(String lines) throws Exception {
                String[] split = lines.split("::");
                String userId = split[0];
                String movieId = split[1];
                double ratings = Double.parseDouble(split[2]);
                Tuple2<String, Double> value = new Tuple2<String, Double>(userId, ratings);
                return new Tuple2<String, Tuple2<String, Double>>(movieId, value);
            }
        });
        /**
         * 以moviesInfo 和 ratingsInfo 做join 后获取形如 [movieId,Tuple2<Tuple2<userId, ratings>, movieName>]
         */
        JavaPairRDD<String, Tuple2<Tuple2<String, Double>, String>> movieIdJoinRDD = ratingsInfo.join(moviesInfo);
        /**
         * 把movieIdJoinRDD映射成 [userId,Tuple2<movieName, Double>]
         */
        JavaPairRDD<String, Tuple2<String, Double>> userIdPairRDD = movieIdJoinRDD.mapToPair(
                new PairFunction<Tuple2<String, Tuple2<Tuple2<String, Double>, String>>, String, Tuple2<String, Double>>() {
                    @Override
                    public Tuple2<String, Tuple2<String, Double>> call(Tuple2<String, Tuple2<Tuple2<String, Double>, String>> tp2) throws Exception {
                        String userId = tp2._2._1._1;
                        String movieName = tp2._2._2;
                        double ratings = tp2._2._1._2;

                        Tuple2<String, Double> value = new Tuple2<String, Double>(movieName, ratings);
                        return new Tuple2<String, Tuple2<String, Double>>(userId, value);
                    }
                });
        /**
         * 与userInfo join 获得 [userId,Tuple2<Tuple2<movieName,ratings>, Tuple4<gender, age, occupation, zipCode>>]
         */
        JavaPairRDD<String, Tuple2<Tuple2<String, Double>, Tuple4<String, Integer, String, String>>> userIdJoinPairRDD = userIdPairRDD.join(userInfo);
        /**
         * 把userIdJoinPairRDD映射成[userId,Tuple3<movieName,gender,ratings>]
         */
        JavaPairRDD<String, Tuple3<String, String, Double>> userIdMapingRDD = userIdJoinPairRDD.mapToPair(
                new PairFunction<Tuple2<String, Tuple2<Tuple2<String, Double>, Tuple4<String, Integer, String, String>>>, String, Tuple3<String, String, Double>>() {
                    @Override
                    public Tuple2<String, Tuple3<String, String, Double>> call(Tuple2<String, Tuple2<Tuple2<String, Double>, Tuple4<String, Integer, String, String>>> tp2) throws Exception {
                        String userId = tp2._1;
                        String movieName = tp2._2._1._1;
                        double ratings = tp2._2._1._2;
                        String gender = tp2._2._2._1();
                        Tuple3<String, String, Double> value = new Tuple3<String, String, Double>(movieName, gender, ratings);
                        return new Tuple2<String, Tuple3<String, String, Double>>(userId, value);
                    }
                });
        /**
         * 分别过滤出女性 和男性
         */
        JavaPairRDD<String, Tuple3<String, String, Double>> femaleUserIdPairRDD = userIdMapingRDD.filter(new Function<Tuple2<String, Tuple3<String, String, Double>>, Boolean>() {
            @Override
            public Boolean call(Tuple2<String, Tuple3<String, String, Double>> tp2) throws Exception {
                return "F".equals(tp2._2._2());
            }
        });

        JavaPairRDD<String, Tuple3<String, String, Double>> maleUserIdPairRDD = userIdMapingRDD.filter(new Function<Tuple2<String, Tuple3<String, String, Double>>, Boolean>() {
            @Override
            public Boolean call(Tuple2<String, Tuple3<String, String, Double>> tp2) throws Exception {
                return "M".equals(tp2._2._2());
            }
        });
        /**
         * 返回 [movieName,tuple2<rating,1>]
         */
        JavaPairRDD<String, Tuple2<Double, Integer>> femaleMovieNameRatingsCountRDD = femaleUserIdPairRDD.mapToPair(new PairFunction<Tuple2<String, Tuple3<String, String, Double>>, String, Tuple2<Double, Integer>>() {
            @Override
            public Tuple2<String, Tuple2<Double, Integer>> call(Tuple2<String, Tuple3<String, String, Double>> tp2) throws Exception {
                String movieName = tp2._2._1();
                double rating = tp2._2._3();
                Tuple2<Double, Integer> value = new Tuple2<Double, Integer>(rating, 1);
                return new Tuple2<String, Tuple2<Double, Integer>>(movieName, value);
            }
        });

        JavaPairRDD<String, Tuple2<Double, Integer>> maleMovieNameRatingsCountRDD = maleUserIdPairRDD.mapToPair(new PairFunction<Tuple2<String, Tuple3<String, String, Double>>, String, Tuple2<Double, Integer>>() {
            @Override
            public Tuple2<String, Tuple2<Double, Integer>> call(Tuple2<String, Tuple3<String, String, Double>> tp2) throws Exception {
                String movieName = tp2._2._1();
                double rating = tp2._2._3();
                Tuple2<Double, Integer> value = new Tuple2<Double, Integer>(rating, 1);
                return new Tuple2<String, Tuple2<Double, Integer>>(movieName, value);
            }
        });

        /**
         * 统计男/女性评价电影的平均分 返回 [avg(rating),movieName]
         */
        JavaPairRDD<Double, String> femaleAvgRatingsRDD = femaleMovieNameRatingsCountRDD.reduceByKey(
                new Function2<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>() {
                    @Override
                    public Tuple2<Double, Integer> call(Tuple2<Double, Integer> v1, Tuple2<Double, Integer> v2) throws Exception {
                        return new Tuple2<Double, Integer>(v1._1 + v2._1, v1._2 + v2._2);
                    }
                })
                .mapToPair(new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, Double, String>() {
                    @Override
                    public Tuple2<Double, String> call(Tuple2<String, Tuple2<Double, Integer>> tp2) throws Exception {
                        double avgRating = tp2._2._1 / tp2._2._2;
                        String movieName = tp2._1;
                        return new Tuple2<Double, String>(avgRating, movieName);
                    }
                });

        JavaPairRDD<Double, String> maleAvgRatingsRDD = maleMovieNameRatingsCountRDD.reduceByKey(
                new Function2<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>() {
                    @Override
                    public Tuple2<Double, Integer> call(Tuple2<Double, Integer> v1, Tuple2<Double, Integer> v2) throws Exception {
                        return new Tuple2<Double, Integer>(v1._1 + v2._1, v1._2 + v2._2);
                    }
                })
                .mapToPair(new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, Double, String>() {
                    @Override
                    public Tuple2<Double, String> call(Tuple2<String, Tuple2<Double, Integer>> tp2) throws Exception {
                        double avgRating = tp2._2._1 / tp2._2._2;
                        String movieName = tp2._1;
                        return new Tuple2<Double, String>(avgRating, movieName);
                    }
                });
        /**
         * 根据平均分降序 取top10
         */
        List<Tuple2<Double, String>> femaleList = femaleAvgRatingsRDD.sortByKey(false).take(10);
        List<Tuple2<Double, String>> maleList = maleAvgRatingsRDD.sortByKey(false).take(10);
        System.out.println("female avg top 10");
        for (Tuple2<Double, String> tp2 : femaleList) {
            System.out.println(tp2._1() + "     " + tp2._2);
        }
        System.out.println("male avg top 10");
        for (Tuple2<Double, String> tp2 : maleList) {
            System.out.println(tp2._1() + "     " + tp2._2);
        }
    }

    /**
     * 统计所有电影中平均评分最高的 前10名
     *
     * @param moviesInfo
     * @param ratingsInfo
     */
    public static void getTop10AvgUserRatings(JavaPairRDD<String, String> moviesInfo, JavaRDD<Tuple3<String, String, Double>> ratingsInfo) {

        /**
         * 从ratingsInfo获取形如 [movieId,(ratings,1)]的数据
         */
        JavaPairRDD<String, Tuple2<Double, Integer>> userIdRatingsPairRDD = ratingsInfo.mapToPair(new PairFunction<Tuple3<String, String, Double>, String, Tuple2<Double, Integer>>() {
            @Override
            public Tuple2<String, Tuple2<Double, Integer>> call(Tuple3<String, String, Double> tp3) throws Exception {
                String key = tp3._2();
                Tuple2<Double, Integer> value = new Tuple2<Double, Integer>(tp3._3(), 1);
                return new Tuple2<String, Tuple2<Double, Integer>>(key, value);
            }
        });

        /**
         * 从userIdRatingsPairRDD中得到形如 [movieId,(sum(ratings),sum(people))]
         */
        JavaPairRDD<String, Tuple2<Double, Integer>> movieIdTotalRatingsCount = userIdRatingsPairRDD.reduceByKey(new Function2<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>() {
            @Override
            public Tuple2<Double, Integer> call(Tuple2<Double, Integer> v1, Tuple2<Double, Integer> v2) throws Exception {
                double totalRatings = v1._1 + v2._1;
                int totalPeople = v1._2 + v2._2;
                return new Tuple2<Double, Integer>(totalRatings, totalPeople);
            }
        });

        /**
         * 获取平均评分
         */

        JavaPairRDD<String, Double> moivesAvgScore = movieIdTotalRatingsCount.mapValues(new Function<Tuple2<Double, Integer>, Double>() {
            @Override
            public Double call(Tuple2<Double, Integer> v1) throws Exception {
                return v1._1 / v1._2;
            }
        });
        /**
         * 以movieId做join
         */
        JavaPairRDD<String, Tuple2<Double, String>> movieIdAvgScoreName = moivesAvgScore.join(moviesInfo);

        JavaPairRDD<Double, Tuple2<String, String>> avgScoreNameId = movieIdAvgScoreName.mapToPair(new PairFunction<Tuple2<String, Tuple2<Double, String>>, Double, Tuple2<String, String>>() {
            @Override
            public Tuple2<Double, Tuple2<String, String>> call(Tuple2<String, Tuple2<Double, String>> tp2) throws Exception {
                double key = tp2._2._1;

                Tuple2<String, String> value = new Tuple2<String, String>(tp2._1, tp2._2._2);
                return new Tuple2<Double, Tuple2<String, String>>(key, value);
            }
        });
        /**
         * 获取前十并按降序排列
         */
        List<Tuple2<Double, Tuple2<String, String>>> tuple2List = avgScoreNameId.sortByKey(false).take(10);

        for (Tuple2<Double, Tuple2<String, String>> tp : tuple2List) {
            System.out.println(tp._2._2 + " " + tp._2._1 + " " + tp._1);
        }
    }
}